package test;

import bean.ClickLog;
import util.MyHBaseSink;
import util.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

/**执行流程:
        *   前端日志 ->nginx->日志采集服务(分流)->kafka(ods_itcast_click_log)
        *   test.UniqueVisitApp(清洗) -> kafka(dwd_itcast_click_log)
        *   ->test.UniqueVisitApp(独立访客) -> HBase(clicklog uv)
        */
public class UniqueVisitApp {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1  准备本地测试流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //1.2 设置并行度
        env.setParallelism(1);

        //1.3 设置Checkpoint
//        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //检查点的生成时间1分钟
//        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //开启状态后端，把检查点保存到 hdfs
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop106:8020/gmall/checkpoint/uniquevisit"));

        //TODO 2.从kafka中读取数据
        String sourceTopic = "0.";
        String sourceTopic2 = "dwd_itcast_click_log";
        String groupId = "unique_visit_app_group";
        String sinkTopic = "dwm_itcast_unique_visit";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, groupId);
        kafkaSource.setStartFromEarliest();
        DataStreamSource<String>  StrDS = env.addSource(kafkaSource);

        //TODO 3.对读取到的数据进行结构的转换
        SingleOutputStreamOperator<ClickLog> ClickLogDS =  StrDS.map(new MapFunction<String, ClickLog>() {
            @Override
            public ClickLog map(String s) throws Exception {
                String[] s1 = s.split(" ");
                return new ClickLog(s1[0],s1[1],s1[3]+" "+s1[4],s1[5]+" "+s1[6]+" "+s1[7],s1[8],s1[9],s1[10],s1[11]);
            }
        });
        ClickLogDS.print("ClickLogDS");

        SingleOutputStreamOperator<String> dwd_itcast_click_log = ClickLogDS.map(new MapFunction<ClickLog, String>() {
            @Override
            public String map(ClickLog clickLog) throws Exception {
                if (clickLog.getUid() != null && !clickLog.getUid().equals("")) {
                    return clickLog.getUid() + " " +
                            clickLog.getIp() + " " +
                            clickLog.getTrackTime() + " " +
                            clickLog.getREQUESTS() + " " +
                            clickLog.getREQUEST_STATUS() + " " +
                            clickLog.getPORT() + " " +
                            clickLog.getREFER_DOMAIN_ARRAY() + " " +
                            clickLog.getUSER_AGENTS();
                }
                return null;
            }
        });

        dwd_itcast_click_log.addSink(MyKafkaUtil.getKafkaSink(sourceTopic2));

        FlinkKafkaConsumer<String> kafkaSource2 = MyKafkaUtil.getKafkaSource(sourceTopic2, groupId);
        kafkaSource2.setStartFromEarliest();
        DataStreamSource<String>  StrDS2 = env.addSource(kafkaSource2);

        SingleOutputStreamOperator<ClickLog> ClickLogDS2 =  StrDS2.map(new MapFunction<String, ClickLog>() {
            @Override
            public ClickLog map(String s) throws Exception {
                String[] s1 = s.split(" ");
                return new ClickLog(s1[0],s1[1],s1[2]+" "+s1[3],s1[4]+" "+s1[5]+" "+s1[6],s1[7],s1[8],s1[9],s1[10]);
            }
        });

//        ClickLogDS2.print("ClickLogDS2");

        //TODO 4.按照uid进行分组
        KeyedStream<ClickLog, String> keybyWithUidDS = ClickLogDS2.keyBy(data->data.getUid());
//        keybyWithUidDS.print("keyed");

       //TODO 5.过滤得到UV
        SingleOutputStreamOperator<ClickLog> filteredDS = keybyWithUidDS.filter(
                new RichFilterFunction<ClickLog>() {
                    //定义状态  redis
                    ValueState<String> lastVisitDateState = null;
                    //定义日期工具类
                    SimpleDateFormat sdf = null;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //初始化日期工具类
                        sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
                        //初始化状态
                        ValueStateDescriptor<String> lastVisitDateStateDes =
                                new ValueStateDescriptor<>("lastVisitDateState", String.class);
                        //因为我们统计的是日活DAU，所以状态数据只在当天有效 ，过了一天就可以失效掉
                        //状态创建或者写入都会更新时间戳
                        //一旦这个状态过期了，不会返回给调用方，只返回空状态
                        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
                        lastVisitDateStateDes.enableTimeToLive(stateTtlConfig);
                        this.lastVisitDateState = getRuntimeContext().getState(lastVisitDateStateDes);
                    }

                    @Override
                    public boolean filter(ClickLog clickLog) throws Exception {

                        //获取当前访问时间
                        String lastDate = clickLog.getTrackTime();
                        //将当前日期字符串转换为访问时间戳
                        Date date = sdf.parse(lastDate.substring(1,21));
                        long time = date.getTime();
                        //获取状态日期
                        boolean flag=false;
                        String lastVisitDate = lastVisitDateState.value();
                        if (lastVisitDate!=null){
                            Date lastdate = sdf.parse(lastVisitDate.substring(1,21));
                            long lasttime = lastdate.getTime();
                            flag=time-lasttime>0;
                        }

                        //用当前页面的访问时间和状态时间进行对比

                        if (lastVisitDate != null && lastVisitDate.length() > 0 && flag) {
//                            System.out.println("已访问：lastVisitDate-" + lastVisitDate + ",||lastDate:" + lastDate);
                            return false;
                        } else {
//                            System.out.println("未访问：lastVisitDate-" + lastVisitDate + ",||lastDate:" + lastDate);
                            lastVisitDateState.update(lastDate);
                            return true;
                        }
                    }
                }
        );

//        filteredDS.print("filteredDS");

        //TODO 6. 向kafka中写回，需要将 ClickLog对象 转换为String
        //6.1 ClickLog -> toString
        SingleOutputStreamOperator<String> kafkaDS = filteredDS.map(
                new MapFunction<ClickLog, String>() {
                    @Override
                    public String map(ClickLog clickLog) throws Exception {
                     return clickLog.getUid() + " " +
                             clickLog.getIp() + " " +
                             clickLog.getTrackTime() + " " +
                             clickLog.getREQUESTS() + " " +
                             clickLog.getREQUEST_STATUS() + " " +
                             clickLog.getPORT() + " " +
                             clickLog.getREFER_DOMAIN_ARRAY() + " " +
                             clickLog.getUSER_AGENTS();

                    }
                }
        );


        //6.2 写回到kafka的dwm层
        kafkaDS.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));

        //6.3 将dwm_unique_visit持久化到 hbase （clicklog uv）
        FlinkKafkaConsumer<String> kafkaSource3 = MyKafkaUtil.getKafkaSource(sinkTopic, groupId);
        kafkaSource3.setStartFromEarliest();
        DataStreamSource<String>  StrDS3 = env.addSource(kafkaSource3);


        SingleOutputStreamOperator<String> ClickLogDS3 =  StrDS3.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                String[] s1 = s.split(" ");
                return s1[0]+"`"+s1[1]+"`"+s1[2]+" "+s1[3]+"`"+s1[4]+" "+s1[5]+" "+s1[6]+"`"+s1[7]+"`"+s1[8]+"`"+s1[9]+"`"+s1[10];
            }
        });
        ClickLogDS3.print("ClickLogDS3");
//        ClickLogDS3.addSink(new MyHBaseSink(ClickLog.class,"clicklog"));

        env.execute();
    }
}
